package cn.cup.dmp.etl

import cn.cup.dmp.beans.Logs
import cn.cup.dmp.config.ConfigHelper
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 将原数据文件转化为parquet文件格式
  */
object BzFile2Parquet {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[*]")
      .setAppName("文件格式转化")
      .set("spark.serializer", ConfigHelper.ser)
    //设置序列化方式为"org.apache.spark.serializer.KryoSerializer"

    //注册自定义类的序列化方式为KryoSerializer
    conf.registerKryoClasses(Array(classOf[Logs]))
    val sc = new SparkContext(conf)

    //读取源文件
    val file = sc.textFile(ConfigHelper.srcfilePath)
    val sQLContext = new SQLContext(sc)

    //清洗数据
    val filtered = file.map(f=> f.split(",",-1)).filter(_.length>=85)

    //把数据装入Logs对象中
    val logs: RDD[Logs] = filtered.map(arr => Logs(arr))//定义了apply方法，因此可以直接把array传递给Logs

    //转化为DataFrame，此处还可以通过toDF
    // import sQLContext.implicits._    =>rdd.toDF()
    val dataFrame: DataFrame = sQLContext.createDataFrame(logs)

    //判断文件夹是否存在,如果存在，则删除，也可以设置写的模式为overwrite
    /*val configuration = sc.hadoopConfiguration
    val fs = FileSystem.get(configuration)
    val path = new Path(ConfigHelper.destPath)
    if (fs.exists(path)){
      fs.delete(path,true)
    }*/

    //写入文件
    //dataFrame.write.parquet(ConfigHelper.destPath+"\\Method2_result")
    dataFrame.write.mode("overwrite").parquet(ConfigHelper.destPath+"\\Method2_result")

    //关流
    sc.stop()

  }

}
